-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Use async/await in the NetworkWorker and in build_network_future #5704
Conversation
|
All tests have passed on the first try, and the node I'm testing this on seems to be behaving properly. |
mxinden
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me.
I have a suggestion in regards to the StatusSinks structure. Hope I am not missing something.
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
|
|
||
| // You should have received a copy of the GNU General Public License |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about following the futures::sink::Sink pattern for StatusSinks.
StatusSinks would have two states, initialized with the latter:
-
Ready(YieldAfter) -
Pending
A user would
-
Call
poll_ready. In case a outbound sender's interval has fired it would move theStatusSinksto statusReady(YieldAfter)and returnPoll::Ready(). -
Call
start_send. It would take theYieldAfterfrom theReadystatus, send the status down the channel and move theStatusSinkstatus back toPending.
I would find this easier to parse, not because the current implementation is complex or complicated, but because futures::sink::Sink is a known familiar pattern. As a nice-to-have it would relief us from the entries_{tx,rx} channel. If I am not mistaken none of this is performance critical, thus I don't understand the pattern of only locking inner within next.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the pattern of only locking inner within next.
The reason is that we are sharing an Arc<StatusSinks> throughout in the networking code.
Calling next() returns a Future that holds a lock to the queue until one element is ready. If we locked in push as well, then as long as next's Future is alive (which is 99% of the time), then calling status_sink.push() would need to wait.
There is no way you can get rid of the entries_tx/rx channel without making the NetworkService::event_stream() function blocking or asynchronous
(except by replacing it by something vaguely equivalent to a channel, like a Vec plus a task notifier).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the Sink suggestion, we can't really do that.
We call status_sink::next() in the NetworkWorker::next_action async method. But if something else is ready (like the import queue, or a network message) then we drop the Future returned by next() and re-acquire it the next time next_action is called.
If we were to turn that into a Sink, then destroying that future would drop the element that we are trying to send.
It also means that the element that we send might already be obsolete when we actually send it.
| if let Some(v) = entries.next().await { | ||
| v | ||
| } else { | ||
| loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully not a dumb question: What is this loop for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not actually sure whether this loop is necessary. I think pending! is like std::thread::yield_now and can return.
|
The fact that this CI test failed and I've never seen it fail before unfortunately doesn't inspire confidence. |
|
Putting on ice as per previous comment. |
|
Closing this for the same reason as #5763 |
Work towards paritytech/polkadot-sdk#557
Refactors the
NetworkWorkerandbuild_network_futurefunctions to use async/await.First of all, I've had to do three side-changes:
SmartImportQueueto bypass its weird API which isn't really usable from async/await.status_sinkno longer has methods that require&mut self. Instead it uses a channel to add elements on the queue, and an internalMutexto protect the queue. That mutex is a futures-mutex and is only locked when we're polling for new events. In other words, if we callnext()twice, only one will run at a time, but that's a very desired behaviour.NetworkWorker::from_workerchannel has been properly renamed tofrom_service.The rest of the PR converts the
NetworkWorker::poll()function into aNetworkWorker::next_actionasync method whose role is to process the network worker, and turnsbuild_network_futureinto an async function.These two big refactors were actually very straight-forward, as I just replaced the various polling with branches of the
futures::select!macro.